🎯 标准 DDD 架构(活动取消 Saga 完整流程)
让我以活动取消退款为例,完整梳理:
1️⃣ 正常流程(用户主动取消)
用户点击取消
↓
Controller (Trigger Trigger↓ 调用
ActivityApplicationService.cancelActivity(activityId, userId)
↓ 执行
ActivityCancellationSaga.execute(command)
├─ Step 1: 取消活动(更新状态)
│ └─ sagaStateRepository.update(state) ✅ 记录到数据库
├─ Step 2: 创建退款记录
│ ├─ ├─isStepCompleted("CREATE_REFUND_RECORDS") ?
│ ├─ 标记: markStepStarting("CREATE_REFUND_RECORDS")
│ ├─ 持久化: sagaStateRepository.update(state) ✅
│
│用: paymentPort.createRefundRecords()
│
│ markStepCompleted("CREATE_REFUND_RECORDS")
│ └─ 持久化: sagaStateRepository.update(state) ✅
├─ Step 3: 执行批量退款
│ ├─ 检查: isStepCompleted("PROCESS_REFUNDS") ?
│ ├─ 标记: markStepStarting("PROCESS_REFUNDS")
│ REFUNDS")
│teRepository.update(state) ✅
│ ├─ 远程调用: paymentPort.batchRefund()
│ ├─ 标记: markStepCompleted("PROCESS_REFUNDS")
│ └─ 持久化: sagaStateRepository.update(state) ✅
└─ Step 4: 发送通知(异步)
└─ domainEventPublisher.publish(ActivityCancelledEvent)2️⃣ 异常恢复流程(网络中断后)
定时任务触发(每 1 分钟)
SagaRecoveryTask (Trigger 层)
↓ 调用
↓
SagaRecoveryTask (Trigger 层)
↓ 调用ecoverPendingSagas()
↓ 实现
SagaRecoveryApplicationService (App 层)
├─ 查询: sagaStateRepository.findPendingSagas()
│ └─ 返回: 所有非终态的 Saga (STARTED, PROCESSING, COMPENSATING)
│
└─ 逐个恢复:
├─ 加载 Saga 实例: ActivityCancellationSaga
ancellationSagaetState(sagaState)
└─ 重新执行: saga.execute(command): 已完成 完成跳过
├─ Step 2: 检查 isStepCompleted("CREATE_REFUND_RECORDS")
│ ├─ 如果已完成: 调用 paymentPort.findRefundRecordsByIdempotencyKey(sagaId)
Key(sagaId)成: 重新执行(幂等)
└─ Step 3: 检查 isStepCompleted("PROCESS_REFUNDS")
├─ 如果已完成: 已完成: └─ 如果未完成: 重新执行(幂等)3️⃣ 补偿流程(Saga 失败)
Saga 执行失败
↓
saga.compensate()
├─ Step 3 补偿: 撤销退款(如果已退款)
├─ Step 2 补偿: 删除退款记录
└─ Step 1 补偿: 偿:活动状态✅ 标准 DDD 六边形架构
完整依赖关系
┌─────────────────────────────────────────────────────────┐
│ Trigger
│ - ActivityController (用户请求)
│ - SagaRecoveryTask (定时任务)
│ - RefundRetryTask (定时任务)
└────────────────┬────────────────────────────────────────┘
用 Input Port────────────────────────────────────────────────┐
│ Domain 层 (Input Port - 业务契约)
│ - IActivityApplicationService
│ - ISagaRecoveryApplicationService
│ - SagaRecoveryApplicationServiceicationService
└────────────────┬────────────────────────────────────────┘
│ 实现
▼
┌─────────────────────────────────────────────────────────┐
│ App
│ - ActivityApplicationService
│ - ApplicationServiceApplicationService
│ - RecoveryApplicationService
│ - RefundRetryApplicationService │
└────────────────┬────────────────────────────────────────┘
│ 调用
▼
┌─────────────────────────────────────────────────────────┐
│ Domain 层 (核心业务逻辑)
│ - ActivityCancellationSaga (取消活动)
│ - ActivityRegistrationSaga (报名活动
│ - ActivityRegistrationSaga (报名活动类)
│ - SagaState (Saga 状态)
└────────────────┬────────────────────────────────────────┘
│ 调用 Output Port
▼
┌─────────────────────────────────────────────────────────┐
│ Domain 层 (Output Port - │
│ - ISagaStateRepository (Saga 状态仓储接口)
│ - IActivityAggregateRepository (活动仓储接口)
│ - SagaStateRepository (Saga 状态仓储接口)
│
┌─────────────────────────────────────────────────────────┐
│ Infrastructure 层 (技术实现)- PaymentAdapter (微信支付适配器)
│ - SagaStateRepository (Saga 状态仓储实现)
│ - ActivityAggregateRepository (活动仓储实现)
└─────────────────────────────────────────────────────────┘问题 2:职责清晰度
| 类名 | 职责 | 层次 |
|---|---|---|
| SagaRecoveryTask | 定时触发 Saga 恢复 | Trigger 层 |
| SagaRecoveryApplicationService | 编排 Saga 恢复逻辑 | App 层 |
| ActivityCancellationSaga | 活动取消业务流程 | Domain 层 |
| SagaStateRepository | Saga 状态持久化 | Infrastructure 层 |
📝 总结:活动取消 Saga 的完整链路
用户视角
用户点击"取消活动"
→ 后台执行 行aga(4 个步骤)
→ 每个步骤都记录到数据库
→ 如果中断,定时任务自动恢复
→ 最终退款成功,用户收到通知技术视角
1. 幂等设计:每个步骤都可重复执行
2. 状态机:记录已完成的步骤,避免重复
3. 持久化:每个关键步骤都保存到数据库
4. 自动恢复:定时任务扫描未完成的 Saga
5. 补偿机制:失败时逆向回滚为什么需要这么复杂?
场景 1:网络中断
Step 2 执行到一半,微信支付接口调用成功
但 sagaStateRepository.update() 失败(网络断开)
定时任务恢复时,通过 isStepCompleted() 检查
调用 paymentPort.findRefundRecordsByIdempotencyKey() 查询结果
跳过 Step 2,继续执行 Step 3 ✅
场景 2:服务重启
Saga 执行到 Step 3,服务突然重启
重启后,定时任务扫描数据库
发现未完成的 Saga,重新加载
从 Step 3 继续执行 ✅
场景 3:微信支付失败
Step 3 退款失败(余额不足)
Saga 标记为 FAILED
RefundRetryTask 定时重试
或者触发补偿流程,恢复活动状态 ✅